/* Copyright (C) 2014 InfiniDB, Inc.
   Copyright (C) 2016 MariaDB Corporation

   This program is free software; you can redistribute it and/or
   modify it under the terms of the GNU General Public License
   as published by the Free Software Foundation; version 2 of
   the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
   MA 02110-1301, USA. */

/***********************************************************************
 *   $Id: droppartitionprocessor.cpp 6567 2010-04-27 19:45:29Z rdempsey $
 *
 *
 ***********************************************************************/
#include "droppartitionprocessor.h"

#include "messagelog.h"
#include "sqllogger.h"
#include "cacheutils.h"
#include "oamcache.h"
#include "logicalpartition.h"

using namespace std;
using namespace execplan;
using namespace logging;
using namespace WriteEngine;
using namespace oam;

namespace ddlpackageprocessor
{
DropPartitionProcessor::DDLResult DropPartitionProcessor::processPackageInternal(
    ddlpackage::SqlStatement* sqlStmt)
{
  SUMMARY_INFO("DropPartitionProcessor::processPackage");

  DDLResult result;
  result.result = NO_ERROR;
  std::string err;
  int rc = 0;
  rc = fDbrm->isReadWrite();
  BRM::TxnID txnID;
  txnID.id = fTxnid.id;
  txnID.valid = fTxnid.valid;

  if (rc != 0)
  {
    logging::Message::Args args;
    logging::Message message(9);
    args.add("Unable to execute the statement due to DBRM is read only");
    message.format(args);
    result.result = DROP_ERROR;
    result.message = message;
    fSessionManager.rolledback(txnID);
    return result;
  }

  auto* dropPartitionStmt = dynamic_cast<ddlpackage::DropPartitionStatement*>(sqlStmt);
  if (!dropPartitionStmt)
  {
    logging::Message::Args args;
    logging::Message message(9);
    args.add("DropPartitionStatement wrong cast");
    message.format(args);
    result.result = ALTER_ERROR;
    result.message = message;
    return result;
  }

  VERBOSE_INFO(dropPartitionStmt);
  // Commit current transaction.
  // all DDL statements cause an implicit commit
  VERBOSE_INFO("Getting current txnID");

  std::vector<CalpontSystemCatalog::OID> oidList;
  CalpontSystemCatalog::OID tableAuxColOid;
  CalpontSystemCatalog::RIDList tableColRidList;
  CalpontSystemCatalog::DictOIDList dictOIDList;
  execplan::CalpontSystemCatalog::ROPair roPair;
  uint32_t processID = 0;
  uint64_t uniqueID = 0;
  uint32_t sessionID = dropPartitionStmt->fSessionID;
  std::string processName("DDLProc");
  uint64_t uniqueId = 0;

  // Bug 5070. Added exception handling
  try
  {
    uniqueId = fDbrm->getUnique64();
  }
  catch (std::exception& ex)
  {
    logging::Message::Args args;
    logging::Message message(9);
    args.add(ex.what());
    message.format(args);
    result.result = ALTER_ERROR;
    result.message = message;
    fSessionManager.rolledback(txnID);
    return result;
  }
  catch (...)
  {
    logging::Message::Args args;
    logging::Message message(9);
    args.add("Unknown error occurred while getting unique number.");
    message.format(args);
    result.result = ALTER_ERROR;
    result.message = message;
    fSessionManager.rolledback(txnID);
    return result;
  }

  auto stmt = formatStatementString(dropPartitionStmt->fSql, dropPartitionStmt->fTableName->fSchema,
                                    dropPartitionStmt->fTableName->fName, dropPartitionStmt->fPartitions);
  SQLLogger logger(stmt, fDDLLoggingId, sessionID, txnID.id);
  ostringstream debugLog;

  try
  {
    // check table lock
    boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
        CalpontSystemCatalog::makeCalpontSystemCatalog(dropPartitionStmt->fSessionID);
    systemCatalogPtr->identity(CalpontSystemCatalog::EC);
    systemCatalogPtr->sessionID(dropPartitionStmt->fSessionID);
    CalpontSystemCatalog::TableName tableName;
    tableName.schema = dropPartitionStmt->fTableName->fSchema;
    tableName.table = dropPartitionStmt->fTableName->fName;
    roPair = systemCatalogPtr->tableRID(tableName);

    //@Bug 3054 check for system catalog
    if (roPair.objnum < 3000)
    {
      throw std::runtime_error("Drop partition cannot be operated on Calpont system catalog.");
    }

    int i = 0;
    processID = ::getpid();
    oam::OamCache* oamcache = OamCache::makeOamCache();
    std::vector<int> pmList = oamcache->getModuleIds();
    std::vector<uint32_t> pms;

    for (unsigned i = 0; i < pmList.size(); i++)
    {
      pms.push_back((uint32_t)pmList[i]);
    }

    try
    {
      uniqueID = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, (int32_t*)&sessionID,
                                     (int32_t*)&txnID.id, BRM::LOADING);
    }
    catch (std::exception&)
    {
      result.result = DROP_ERROR;
      result.message = IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE);
      // no need to release lock. dbrm un-hold the lock
      fSessionManager.rolledback(txnID);
      return result;
    }

    if (uniqueID == 0)
    {
      int waitPeriod = 10;
      int sleepTime = 100;  // sleep 100 milliseconds between checks
      int numTries = 10;    // try 10 times per second
      waitPeriod = Config::getWaitPeriod();
      numTries = waitPeriod * 10;
      struct timespec rm_ts;

      rm_ts.tv_sec = sleepTime / 1000;
      rm_ts.tv_nsec = sleepTime % 1000 * 1000000;

      for (; i < numTries; i++)
      {
        struct timespec abs_ts;

        do
        {
          abs_ts.tv_sec = rm_ts.tv_sec;
          abs_ts.tv_nsec = rm_ts.tv_nsec;
        } while (nanosleep(&abs_ts, &rm_ts) < 0);

        // reset
        sessionID = dropPartitionStmt->fSessionID;
        txnID.id = fTxnid.id;
        txnID.valid = fTxnid.valid;
        processID = ::getpid();
        processName = "DDLProc";

        try
        {
          uniqueID = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, (int32_t*)&sessionID,
                                         (int32_t*)&txnID.id, BRM::LOADING);
        }
        catch (std::exception&)
        {
          result.result = DROP_ERROR;
          result.message = IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE);
          fSessionManager.rolledback(txnID);
          return result;
        }

        if (uniqueID > 0)
          break;
      }

      if (i >= numTries)  // error out
      {
        result.result = DROP_ERROR;
        logging::Message::Args args;
        string strOp("drop partition");
        args.add(strOp);
        args.add(processName);
        args.add((uint64_t)processID);
        args.add((uint64_t)sessionID);
        result.message = Message(IDBErrorInfo::instance()->errorMsg(ERR_TABLE_LOCKED, args));
        fSessionManager.rolledback(txnID);
        return result;
      }
    }
    debugLog << "DropPartitionProcessor: got table lock (id) " << uniqueID << " for table (OID)"
             << roPair.objnum << endl;

    // 1. Get the OIDs for the columns
    // 2. Get the OIDs for the dictionaries
    // 3. Save the OIDs to a log file
    // 4. Disable the extents from extentmap for the partition
    // 5. Remove the column and dictionary  files for the partition
    // 6. Flush PrimProc Cache
    // 7. Remove the extents from extentmap for the partition

    CalpontSystemCatalog::TableName userTableName;
    userTableName.schema = dropPartitionStmt->fTableName->fSchema;
    userTableName.table = dropPartitionStmt->fTableName->fName;

    tableColRidList = systemCatalogPtr->columnRIDs(userTableName);
    tableAuxColOid = systemCatalogPtr->tableAUXColumnOID(userTableName);
    dictOIDList = systemCatalogPtr->dictOIDs(userTableName);

    debugLog << "DropPartitionProcessor:" << endl;
    debugLog << "column RIDS: ";
    for (const auto& rid : tableColRidList)
      debugLog << rid.objnum << " ";
    debugLog << endl;
    debugLog << "dict OIDS: ";
    for (const auto& dictOid : dictOIDList)
      debugLog << dictOid.dictOID << " ";
    debugLog << endl;

    // Save qualified tablename, all column, dictionary OIDs, and transaction ID into a file in ASCII format
    for (unsigned i = 0; i < tableColRidList.size(); i++)
    {
      if (tableColRidList[i].objnum > 3000)
        oidList.push_back(tableColRidList[i].objnum);
    }

    if (tableAuxColOid > 3000)
    {
      oidList.push_back(tableAuxColOid);
    }

    for (unsigned i = 0; i < dictOIDList.size(); i++)
    {
      if (dictOIDList[i].dictOID > 3000)
        oidList.push_back(dictOIDList[i].dictOID);
    }

    // Mark the partition disabled from extent map
    string emsg;
    rc = fDbrm->markPartitionForDeletion(oidList, dropPartitionStmt->fPartitions, emsg);

    if (rc != 0 && rc != BRM::ERR_PARTITION_DISABLED && rc != BRM::ERR_INVALID_OP_LAST_PARTITION &&
        rc != BRM::ERR_NOT_EXIST_PARTITION)
    {
      throw std::runtime_error(emsg);
    }
    debugLog << "DropPartitionProcessor: marked partitions for deletion:" << endl;
    for (const auto& partition : dropPartitionStmt->fPartitions)
      debugLog << "dbroot: " << partition.dbroot << " physical parition: " << partition.pp
               << " segment: " << partition.seg << endl;
    VERBOSE_INFO(debugLog.str());
    debugLog.clear();

    set<BRM::LogicalPartition> markedPartitions;
    set<BRM::LogicalPartition> outOfServicePartitions;

    // only log partitions that are successfully marked disabled.
    rc = fDbrm->getOutOfServicePartitions(oidList[0], outOfServicePartitions);

    if (rc != 0)
    {
      string errorMsg;
      BRM::errString(rc, errorMsg);
      ostringstream oss;
      oss << "getOutOfServicePartitions failed  due to " << errorMsg;
      throw std::runtime_error(oss.str());
    }

    set<BRM::LogicalPartition>::iterator it;

    for (it = dropPartitionStmt->fPartitions.begin(); it != dropPartitionStmt->fPartitions.end(); ++it)
    {
      if (outOfServicePartitions.find(*it) != outOfServicePartitions.end())
        markedPartitions.insert(*it);
    }

    // Save the oids to a file
    createWritePartitionLogFile(roPair.objnum, markedPartitions, oidList, uniqueId);

    VERBOSE_INFO("Removing parition files");
    removePartitionFiles(oidList, markedPartitions, uniqueId);
    // Flush PrimProc cache for those lbids
    VERBOSE_INFO("Flushing paritions");
    rc = cacheutils::flushPartition(oidList, markedPartitions);

    // Remove the partition from extent map
    emsg.clear();
    rc = fDbrm->deletePartition(oidList, dropPartitionStmt->fPartitions, emsg);
    VERBOSE_INFO("DropPartitionProcessor: partitions removed");

    if (rc != 0)
      throw std::runtime_error(emsg);
  }
  catch (exception& ex)
  {
    cerr << "DropPartitionProcessor::processPackage: " << ex.what() << endl;

    logging::Message::Args args;
    logging::Message message(ex.what());

    if (rc == BRM::ERR_TABLE_NOT_LOCKED)
      result.result = USER_ERROR;
    else if (rc == BRM::ERR_NOT_EXIST_PARTITION || rc == BRM::ERR_INVALID_OP_LAST_PARTITION)
      result.result = PARTITION_WARNING;
    else if (rc == BRM::ERR_NO_PARTITION_PERFORMED)
      result.result = WARN_NO_PARTITION;
    else
      result.result = DROP_ERROR;

    result.message = message;

    try
    {
      fDbrm->releaseTableLock(uniqueID);
    }
    catch (std::exception&)
    {
      result.result = DROP_ERROR;
      result.message = IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE);
    }

    fSessionManager.rolledback(txnID);
    return result;
  }
  catch (...)
  {
    cerr << "DropPartitionProcessor::processPackage: caught unknown exception!" << endl;

    logging::Message::Args args;
    logging::Message message(1);
    args.add("Drop partition failed: ");
    args.add("encountered unkown exception");
    args.add("");
    args.add("");
    message.format(args);

    result.result = DROP_ERROR;
    result.message = message;

    try
    {
      fDbrm->releaseTableLock(uniqueID);
    }
    catch (std::exception&)
    {
      result.result = DROP_ERROR;
      result.message = IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE);
    }

    fSessionManager.rolledback(txnID);
    return result;
  }

  // Log the DDL statement
  logging::logDDL(dropPartitionStmt->fSessionID, txnID.id, dropPartitionStmt->fSql,
                  dropPartitionStmt->fOwner);

  // Remove the log file
  // release the transaction
  try
  {
    fDbrm->releaseTableLock(uniqueID);
    deleteLogFile(DROPPART_LOG, roPair.objnum, uniqueId);
  }
  catch (std::exception&)
  {
    result.result = DROP_ERROR;
    result.message = IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE);
    fSessionManager.rolledback(txnID);
    return result;
  }

  fSessionManager.committed(txnID);
  VERBOSE_INFO("DropPartitionProcessor:: commited");
  return result;
}

}  // namespace ddlpackageprocessor
